package org.zgjy;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.zgjy.mapper.MqttMsgMapper;
import org.zgjy.model.MqttMsg;

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component
@EnableScheduling
public class MqttService{

    @Autowired
    private MqttMsgMapper mqttMsgMapper;

    final static String instanceId = "post-cn-zz11xu4zl0g";
    final static String endPoint = "post-cn-zz11xu4zl0g.mqtt.aliyuncs.com";
    final static String accessKey = "LTAI4G1yvWe6pvnEjYq7xt8N";
    final static String secretKey = "et3A0Ycy0SF7gxnft059uf6rnRtZZM";
    final static String clientId = "GID_wsc_mqtt@@@DCServer_huayang";
    final static String fieldsDcsUp = "fieldsDcsUp";
    final static String fieldsDcsDown = "fieldsDcsDown";
    final static String factoryCode = "huayang";
    final static String topic_up = fieldsDcsUp + "/" + factoryCode;
    final static String topic_down = fieldsDcsDown + "/" + factoryCode;
    final static int qosLevel = 1;

    public static MqttClient mqttClient;
    public static ConnectionOptionWrapper connectionOptionWrapper ;
    public static MemoryPersistence memoryPersistence ;
    public static ExecutorService executorService ;
    public static MqttConnectOptions options ;


    public static volatile ArrayList<ModbusTimerTask> modbusTimerTasks = new ArrayList<>();

    public static void add_message_source(ModbusTimerTask modbusTimerTask){
        modbusTimerTasks.add(modbusTimerTask);
    }

    @Scheduled(cron="*/5 * * * * ?")
    public void run() {

        Thread currentThread = Thread.currentThread();
        System.out.println("###*** MqttService run, ThreadID: " + currentThread.getId() + " , ThreadName: " + currentThread.getName());
        try {
            if (connectionOptionWrapper == null) {
                connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            }
            if (memoryPersistence == null) {
                memoryPersistence = new MemoryPersistence();
            }

            if (executorService == null) {
                executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>());
            }

            if (mqttClient == null) {

                mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
                mqttClient.setTimeToWait(3000);
                mqttClient.setCallback(new MqttCallbackExtended() {
                    @Override
                    public void connectComplete(boolean reconnect, String serverURI) {
                        System.out.println("===### Connect success ! ###===");
                        executorService.submit(new Runnable() {
                            @Override
                            public void run() {
                                System.out.println("### executorService.submit Runnable ! ###");
                                try {
                                    final String[] topicFilter = {topic_down};
                                    final int[] qos = {qosLevel};
                                    mqttClient.subscribe(topicFilter, qos);
                                } catch (MqttException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                    }

                    @Override
                    public void connectionLost(Throwable throwable) {
                        throwable.printStackTrace();
                        System.out.println("### Connect lost ! ###");
                    }

                    @Override
                    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                        System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                    }

                    @Override
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                        System.out.println("### send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                    }
                });

                System.out.println("===### mqttClient connecting begin ! ###===" + mqttClient.toString() + " ### ThreadID: " + currentThread.getId() + " , ThreadName: " + currentThread.getName());
                options = connectionOptionWrapper.getMqttConnectOptions();
                mqttClient.connect(options);
                System.out.println("===### mqttClient connecting end ! ###===");
            }

            for (int i = 0; i < this.modbusTimerTasks.size(); i++) {

                ModbusTimerTask curr_modbusTimerTask =  this.modbusTimerTasks.get(i);
                for(int j=0; j<curr_modbusTimerTask.msgQueue.size();j++){
                    MqttMsg mqttMsg = curr_modbusTimerTask.msgQueue.get(j);
                    if(!mqttMsg.sendTime.equals("")){
                        continue;
                    }
                    MqttMessage message = new MqttMessage(mqttMsg.content.getBytes());
                    message.setQos(qosLevel);
                    //mqttClient.publish(topic_up, message);

                    mqttMsgMapper.insert(mqttMsg);
                }
            }
        }catch (Exception e){
            System.out.println(e);
        }
    }
}